Performance Analysis Results¶ Notes These performance statistics were taken when the load average was below 3.8 in the 4 core instance. Most of the examples given below use log sinks to log statistics for performance monitoring purposes. However, note that log sinks incur a high system overhead and can lower performance by even more than 50%. Performance analysis results summary¶ The recommended CPU and memory specifications for Docker containers are as follows: CPU: 4 cores Memory: 8GB The recommended memory specifications SI server as follows. These are configured in the <SI_HOME>/wso2/server/bin/carbon.sh file. Xms: 2GB Xmx: 4GB The exact specifications used in the use cases listed in this section are summarised in the table below: Scenario CPU Memory SI Memory Allocation Input TPS Input Message Size Output TPS Consuming events using Kafka source 4 cores 8GB - Xms: 2g - Xmx: 4g 180K 60 bytes Not Available Consuming messages from an HTTP Source 4 cores 8GB - Xms: 2g - Xmx: 4g 30K 60 bytes Not Available Sending HTTP requests and consuming the responses 4 cores 8GB - Xms: 2g - Xmx: 4g 29K Sent: 60 bytesReceived: 60 bytes - To HTTP Source: 29K - To HTTP Request Sink: 29K Performing ETL tasks 4 cores 16GB - Xms: 2g - Xmx: 4g 29K Read: 100 bytesStored: 200 bytes To Oracle Store: 72K Consuming messages from a Kafka source and publish to an HTTP endpoint 2 cores Docker Memory: 3GB - Xms: 256m - Xmx: 1g 10K Consumed: 400 bytesPublished: 600 bytes 10K Consuming messages from a CSV file and publish to a MySQL table 4 cores Docker Memory: 8GB - Xms: 2g - Xmx: 4g 9K Read: 300 bytesPublished: 300 bytes 9K Monitoring a database table in MySQL and publishing data to a Kafka topic 4 cores Docker Memory: 8GB - Xms: 2g - Xmx: 4g 13K Read: 300 bytesPublished: 300 bytes 13K Read XML file and mapping to a stream 4 cores Docker Memory: 8GB - Xms: 2g - Xmx: 4g 40K read: 350 bytes 40K Reading an XML file and publishing to a Kafka topic 4 cores Docker Memory: 8GB - Xms: 2g - Xmx: 4g 38K read: 350 bytesPublished: 350 bytes 38K Consuming events using Kafka source¶ Specifications of EC2 Instances¶ Stream Processor : c5.xLarge Kafka server : c5.xLarge Kafka publisher : c5.xLarge Siddhi Application¶ @App:name("HelloKafka") @App:description('Consume events from a Kafka Topic and publish to a different Kafka Topic') @source(type='kafka', topic.list='kafka_topic', partition.no.list='0', threading.option='single.thread', group.id="group", bootstrap.servers='172.31.0.135:9092', @map(type='json')) define stream SweetProductionStream (name string, amount double); @sink(type='log') define stream KafkaSourceThroughputStream(count long); from SweetProductionStream#window.timeBatch(5 sec) select count(*)/5 as count insert into KafkaSourceThroughputStream; Results¶ Average Consuming TPS from Kafka: 180K Consuming messages from an HTTP Source¶ Specifications of EC2 Instances¶ Stream Processor : c5.xLarge JMeter : c5.xLarge Siddhi Application¶ @App:name("HttpSource") @App:description('Consume events from http clients') @source(type='http', worker.count='20', receiver.url='http://172.31.2.99:8081/service', @map(type='json')) define stream SweetProductionStream (name string, amount double); @sink(type='log') define stream HttpSourceThroughputStream(count long); from SweetProductionStream#window.timeBatch(5 sec) select count(*)/5 as count insert into HttpSourceThroughputStream; Results¶ Average Consuming TPS from Http Source: 30K Sending HTTP requests and consuming the responses¶ Specifications of EC2 Instances¶ Stream Processor : c5.xLarge JMeter : c5.xLarge Web server : c5.xLarge Siddhi Application¶ @App:name("HttpRequestResponse") @App:description('Consume events from an HTTP source, ') @source(type='http', worker.count='20', receiver.url='http://172.31.2.99:8081/service', @map(type='json')) define stream SweetProductionStream (name string, amount double); @sink(type='http-request', l, sink.id='production-request', publisher.url='http://172.17.0.1:8688//netty_echo_server', @map(type='json')) define stream HttpRequestStream (batchNumber double, lowTotal double); @source(type='http-response' , sink.id='production-request', http.status.code='200', @map(type='json')) define stream HttpResponseStream(batchNumber double, lowTotal double); @sink(type='log') define stream FinalThroughputStream(count long); @sink(type='log') define stream InputThroughputStream(count long); from SweetProductionStream select 1D as batchNumber, 1200D as lowTotal insert into HttpRequestStream; from SweetProductionStream#window.timeBatch(5 sec) select count(*)/5 as count insert into InputThroughputStream; from HttpResponseStream#window.timeBatch(5 sec) select count(*)/5 as count insert into FinalThroughputStream; Results¶ Average Consuming TPS to HTTP Source : 29K Average Publishing TPS from HTTP request sink : 29K Average Consuming TPS from HTTP response source: 29K Performing ETL tasks¶ Specifications of EC2 Instances¶ Stream Processor : m4.xlarge JMeter : m4.xlarge Web server : m4.xlarge Siddhi Application¶ This scenario was tested using two Siddhi applications that execute the process explained below. The two Siddhi applications are as follows: ETLFIleRecordsCopier.siddhi @App:name('ETLFileRecordsCopier') @App:description('This sample demonstrates on integrating a File in a particular location with a Database.') @source(type='file', mode='LINE', dir.uri='file:/Users/wso2/demo/accurate-files', action.after.process='MOVE', move.after.process='file:/Users/wso2/demo/moved', tailing='false', header.present='true', @map( type='csv', delimiter='|', @attributes(code = '0', serialNo = '1', amount = '2', fileName = 'trp:file.path', eof = 'trp:eof'))) define stream FileReaderStream (code string, serialNo string, amount double, fileName string, eof string); -- Reads from file @Store(type="rdbms", jdbc.url="jdbc:mysql://localhost:3306/batchInformation?useSSL=false", username="root", password="root" , jdbc.driver.name="com.mysql.cj.jdbc.Driver", isAutoCommit = 'true') define table AccurateBatchTable(serialNo string, amount double, fileName string, status string, timestamp long); @sink(type='log', prefix='File to DB copying has Started: ') define stream FileReadingStartStream(fileName string); @sink(type='log', prefix='File to DB copying has Finished: ') define stream FileReadingEndStream(fileName string); from FileReaderStream select serialNo, amount, fileName, "test" as status, eventTimestamp() as timestamp, count() as rowNumber, eof insert into DataStream; from DataStream select * insert into DataStreamPassthrough; -- Write to DB Passthrough from DataStreamPassthrough#window.externalTimeBatch(timestamp, 5 sec, timestamp, 10 sec) select serialNo, amount, fileName, status, timestamp, rowNumber, eof insert into TemporaryTablePassthroughStream; -- Log First Record from TemporaryTablePassthroughStream[rowNumber == 1] select fileName insert into FileReadingStartStream; -- Log Every 100000th Record from TemporaryTablePassthroughStream select fileName, rowNumber as rows insert into FileReadingInProgressStream; -- Log Last Record from TemporaryTablePassthroughStream[eof == 'true'] select fileName insert into FileReadingEndStream; -- Write to DB from TemporaryTablePassthroughStream#window.batch() select serialNo, amount, fileName, status, timestamp insert into AccurateBatchTable; ETLFileAnalyzer.siddhi @App:name('ETLFileAnalyzer') @App:description('This sample demonstrates on moving files to a specific location comparing its content with the header values.') @source(type='file', mode='REGEX', dir.uri='file:/Users/wso2/demo/new', action.after.process='MOVE', move.after.process='file:/Users/wso2/demo/header-processed', tailing='false', @map( type='text', fail.on.missing.attribute = 'false', regex.A='HDprod-[a-zA-z]*-[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]-([0-9]+)', @attributes( expectedRowCount = 'A[1]', fileName = 'trp:file.path'))) define stream HeaderReaderStream (fileName string, expectedRowCount long); @source(type='file', mode='LINE', dir.uri='file:/Users/wso2/demo/header-processed', tailing='false', header.present='true', @map( type='csv', delimiter='|', @attributes(code = '0', serialNo = '1', amount = '2', fileName = 'trp:file.path', eof = 'trp:eof'))) define stream FileReaderStream (code string, serialNo string, amount double, fileName string, eof string); @sink(type='log', prefix='Accurate Batch: ') define stream AccurateFileNotificationStream (fromPath string); @sink(type='log', prefix='Inaccurate Batch: ') define stream InaccurateFileNotificationStream (fromPath string); @sink(type='log', prefix='Batch checking started: ') define stream ExpectedRowCountsStream (fileName string, expectedRowCount long); define stream AnalyzingLogStream (fileName string, rowCount long); define table ExpectedRowCountsTable (fileName string, expectedRowCount long, existingRowCount long); @sink(type='log', prefix='Batch checking finished: ') define stream ExistingRowCountsStream (fileName string, existingRowCount long); -- Expected Row Count reader. Moves file from 'new' to 'header-processed' from HeaderReaderStream[NOT(expectedRowCount is null) and NOT(fileName is null)] select * insert into ExpectedRowCountsStream; from ExpectedRowCountsStream select fileName, expectedRowCount, -1L as existingRowCount insert into ExpectedRowCountsTable; -- Existing Row Count calculator. Moves file from 'header-processed' to 'rows-counted' from FileReaderStream select * insert into FileDataStream; partition with (fileName of FileDataStream) begin from FileDataStream select fileName, count() as rowCount, eof insert into #ThisFileRowCounts; from #ThisFileRowCounts select fileName, rowCount insert into AnalyzingLogStream; from #ThisFileRowCounts[eof == 'true'] select fileName, rowCount as existingRowCount insert into ExistingRowCountsStream; end; -- Existing vs. Expected Row Counts comparer from ExistingRowCountsStream as S inner join ExpectedRowCountsTable as T on str:replaceFirst(S.fileName, 'header-processed', 'new') == T.fileName select S.fileName as fromPath, T.expectedRowCount as expectedRowCount, S.existingRowCount as existingRowCount insert into FileInfoMatcherStream; from FileInfoMatcherStream select fromPath, existingRowCount update ExpectedRowCountsTable set ExpectedRowCountsTable.existingRowCount = existingRowCount on ExpectedRowCountsTable.fileName == fromPath; -- Accurate file mover from FileInfoMatcherStream[expectedRowCount == existingRowCount] select fromPath insert into AccurateFileStream; from AccurateFileStream#file:move(fromPath, '/Users/wso2/demo/accurate-files/') select fromPath insert into AccurateFileNotificationStream; -- Inaccurate batch file mover from FileInfoMatcherStream[expectedRowCount != existingRowCount] select fromPath insert into InaccurateFileStream; from InaccurateFileStream#file:move(fromPath, '/Users/wso2/demo/inaccurate-files/') select fromPath insert into InaccurateFileNotificationStream; For a detailed description of this scenario, see the Streaming ETL with WSO2 Integrator: SI article Results¶ The performance statistics of this scenario are as follows: Lines : 6,140,031 Size : 124MB Database : AWS RDS instance with oracle-ee 12.1.0.2.v15 Duration : 1.422 minutes (85373ms) Consuming messages from a Kafka source and publish to an HTTP endpoint¶ Specifications of EC2 Instances¶ Docker resource allocation¶ Memory 3GB CPU 2 Cores Server memory allocation¶ Xms 256m Xmx 1g Siddhi applications¶ The following Siddhi applications were used in this scenario: To read messages from a Kafka topic, do a transformation and insert into an in-memory topic: @App:name('kafka-consumer') @App:description('Reads messages from kafka topics and puts into in-memory-input topic') @sink(type = 'inMemory', topic = "in-memory-input", @map(type = 'passThrough')) define stream ToInMemoryInput (kafkaConsumerInTS long, kafkaConsumerOutTS long, locations string, material string, createdDate string, sid string, headline string, body string, publishTS long, id string); @source(type = 'kafka', topic.list = "test3", threading.option = "single.thread", group.id = "group1", bootstrap.servers = "172.31.39.91:9092", optional.configuration = "auto.offset.reset:latest", @map(type = 'json', fail.on.missing.attribute = "false", enclosing.element = "$")) define stream FromKafkaMessage (locations string, material string, createdDate string, sid string, headline string, body string, publishTS string, id string, updatedDate string); @sink(type = 'log', prefix = '----------------------Kafka Consumer Throughput per second: ', @map(type = 'json')) define stream LogSink (totalEventsPerSec long); @info(name = 'Kafka Consumer Event Timestamp') from FromKafkaMessage select eventTimestamp() as kafkaConsumerInTS, time:timestampInMilliseconds() as kafkaConsumerOutTS, locations, material, str:replaceFirst(createdDate, 'Z', 'GMT') as createdDate, sid, headline, body, time:timestampInMilliseconds(str:replaceFirst(ifThenElse(publishTS is null, updatedDate, publishTS), 'Z', 'GMT'), "yyyy-MM-dd'T'HH:mm:ss.SSSZ") as publishTS, ifThenElse( id is null, 'null', id) as id insert into ToInMemoryInput; from FromKafkaMessage#window.timeBatch(1 sec) select count() as totalEventsPerSec insert into LogSink; To filter dynamic headers from incoming data stream @App:name('Intermediate-process') @App:description('Filter dynamic headers from incoming data stream') @sink(type = "inMemory", topic = "in-memory-output", @map(type = "passThrough")) define stream ToInMemoryOutput (sid string, connectionId string, headers string, data string); @source(type = 'inMemory', topic = "in-memory-input", @map(type = 'passThrough')) define stream FromInMemoryInput (kafkaConsumerInTS long, kafkaConsumerOutTS long, locations object, material object, createdDate string, sid string, headline string, body string, publishTS long, id string); @info(name = 'Filter Heards Messages') from FromInMemoryInput select sid, "test_connectionId" as connectionId, "'connectionId:test_connectionId','appKey:workManWork','Content-type:application/json'" as headers, str:fillTemplate(""" { "type": "heards_sub_resp", "publishTS": {{ publishTS }}, "dynamicAppInTS": {{ dynamicAppInTS }}, "dynamicAppOutTS": {{ dynamicAppOutTS }}, "kafkaConsumerInTS": {{ kafkaConsumerInTS }}, "kafkaConsumerOutTS": {{ kafkaConsumerOutTS }}, "headline":"{{ headline }}", "body":"{{ body }}", "id": "{{ id }}", "material": {'jinja2content': True}, "locations": {{ locations }}, "createdDate": "{{ createdDate }}", "sid":"{{ sid }}", "correlationId":"{{ correlationId }}" }""", map:create( 'headline', headline, 'body', body, 'id', id, 'material', json:getString(material, '$'), 'locations', json:getString(locations, '$'), 'createdDate', str:replaceFirst(createdDate, 'GMT', 'Z'), 'sid', sid, 'publishTS', publishTS, 'dynamicAppOutTS', time:timestampInMilliseconds(), 'dynamicAppInTS', eventTimestamp(), 'kafkaConsumerInTS', kafkaConsumerInTS, 'kafkaConsumerOutTS', kafkaConsumerOutTS, 'correlationId', 'Test123')) as data insert into ToInMemoryOutput; - To read output messages from the in-memory-output topic and publish them to the HTTP client @App:name('ws-publisher') @App:description('Reads from in-memory-output topic and publishes messages to client') @source(type = 'inMemory', topic = "in-memory-output", @map(type = 'passThrough')) define stream fromInMemoryOutput (sid string, connectionId string, headers string, data string); @sink(type = 'http', method = "POST", publisher.url = "http://172.31.39.177:8280/services/TestProxy", headers = "{{ headers }}", on.error = "LOG", max.pool.active.connections="1000", ssl.verification.disabled = "true", @map(type = 'json', @payload("""{"data":{{ data }} }"""))) define stream ToWsClient (data string, wsPublisherOutTS long, headers string, connectionId string, sid string); @sink(type = 'log', prefix = '----------------------WS Publisher Throughput per second: ', @map(type = 'json')) define stream LogSink (totalEventsPerSec long); @info(name = 'Add Filtered Message Timestamp') from fromInMemoryOutput select json:toString(json:setElement(json:setElement(json:toObject(data), '$', eventTimestamp(), 'wsPublisherInTS'), '$', time:timestampInMilliseconds(), 'wsPublisherOutTS')) as data, time:timestampInMilliseconds() as wsPublisherOutTS, headers, connectionId, sid insert into ToWsClient; from ToWsClient#window.timeBatch(1 sec) select count() as totalEventsPerSec insert into LogSink; Results¶ Memory consumed: 1g TPS: 10,000 Consuming messages from a CSV file and publish to a MySQL table¶ Specifications of EC2 Instances¶ Docker resource allocation¶ Memory 8GB CPU 4 Cores Server memory allocation¶ Xms 2g Xmx 4g Siddhi application¶ @App:name("FileToRdbms") @App:description("Description of the plan") @store(type='rdbms' , jdbc.url='jdbc:mysql://172.31.18.173:3306/purchesOrder?useSSL=false',username='root',password='root',jdbc.driver.name='com.mysql.cj.jdbc.Driver') define table PurchesOrderTable (orderID string, numberOfItems int, totalValue double, paymentStatus string, deliveryAddress string ); @source(type='file', mode='line', file.uri='file:/home/ubuntu/csv/productTable.csv', tailing='false', action.after.process='MOVE', move.after.process='file:/home/ubuntu/csv/moved', @map(type='csv', delimiter=',')) define stream InventoryUpdate (orderID string, numberOfItems int, totalValue double, paymentStatus string, deliveryAddress string); @async(buffer.size='4096', workers='2', batch.size.max='5000') define stream IntrimEventStream(orderID string, numberOfItems int, totalValue double, paymentStatus string, deliveryAddress string); from InventoryUpdate select * insert into IntrimEventStream; from IntrimEventStream select * insert into PurchesOrderTable; from InventoryUpdate#window.timeBatch(1 sec) select count() as throughput insert into OutputStream; from OutputStream#log('TPS: ') insert into TempStream; Results¶ Memory consumed: 2.56g TPS: 9,000 Monitoring a database table in MySQL and publishing data to a Kafka topic¶ Specifications of EC2 Instances¶ Docker resource allocation¶ Memory 8GB CPU 4 Cores Server memory allocation¶ Xms 2g Xmx 4g Siddhi applications¶ @App:name("PurchaseOrderSiddhiApp") @App:description("Description of the plan") --@sink(type='log') @source(type = 'cdc', url = "jdbc:mysql://172.31.18.173:3306/order?useSSL=false", username = "root", password = "root", table.name = "PurchesOrders", operation = "insert", @map(type = 'keyvalue', fail.on.missing.attribute = "false")) define stream PurchesOrderStream (orderID string, numberOfItems int, totalValue double, paymentStatus string, deliveryAddress string ); @sink(type='kafka', topic='delivery_items_topic', bootstrap.servers='172.31.3.169:9092', partition.no='0', @map(type='xml')) define stream kafkaPublisherStream(orderID string, numberOfItems int, totalPayable double, deliveryAddress string); @sink(type='log') define stream kafkPubTps(pubCount long); @sink(type='log') define stream publishTps(recCount long); from PurchesOrderStream[paymentStatus =='cod' or paymentStatus=='paid'] select orderID, numberOfItems, ifThenElse(paymentStatus=='cod', totalValue, 0.0) as totalPayable, deliveryAddress insert into kafkaPublisherStream; from PurchesOrderStream#window.timeBatch(1 sec) select count() as recCount insert into publishTps; from kafkaPublisherStream#window.timeBatch(1 sec) select count() as pubCount insert into kafkPubTps; Results¶ Memory Consumption: 1.5g Time taken: 46 minutes Data set size: 34,330,327 Read XML file and mapping to a stream¶ Specifications of EC2 Instances¶ Docker resource allocation¶ Memory 8GB CPU 4 Cores Server memory allocation¶ Xms 2g Xmx 4g Siddhi applications¶ @App:name("NodesConvertor") @App:description("Description of the plan") @source( type = 'file', file.uri = "file:/home/ubuntu/csv/input.xml", mode = "line", tailing = "false", action.after.process='keep', @map(type='xml', enclosing.element="/osm/node", enclosingElementAsEvent="true", enable.streaming.xml.content="true", fail.on.missing.attribute="false", @attributes(id = "/node/@id", lat = "/node/@lat", lon = "/node/@lon", version = "/node/@version", timestamp = "/node/@timestamp", changeset = "/node/@changeset"))) define stream FooStream (id string, lat string, lon string, version string, timestamp string, changeset string); @info(name = 'totalQuery') from FooStream#window.timeBatch(1 sec) select count() as throughput insert into OutputStream; from OutputStream#log('TPS: ') insert into TempStream; Results¶ Memory consumption: 1.2g TPS: 40,000 Reading an XML file and publishing to a Kafka topic¶ Specifications of EC2 Instances¶ Docker resource allocation¶ Memory 8GB CPU 4 Cores Server memory allocation¶ Xms 2g Xmx 4g Siddhi applications¶ @App:name("NodesConvertor") @App:description("Description of the plan") @source( type = 'file', file.uri = "file:/home/ubuntu/csv/input.xml", mode = "line", tailing = "false", action.after.process='keep', @map(type='xml', enclosing.element="/osm/node", enclosingElementAsEvent="true", enable.streaming.xml.content="true", fail.on.missing.attribute="false", @attributes(id = "/node/@id", lat = "/node/@lat", lon = "/node/@lon", version = "/node/@version", timestamp = "/node/@timestamp", changeset = "/node/@changeset"))) define stream FooStream (id string, lat string, lon string, version string, timestamp string, changeset string); @sink(type='kafka', topic='kafka_result_topic', bootstrap.servers='172.31.3.169:9092', partition.no='0', @map(type='xml')) define stream kafkaStream(id string, lat string, lon string, version string, timestamp string, changeset string); @info(name = 'totalQuery') from FooStream#window.timeBatch(1 sec) select count() as throughput insert into OutputStream; from OutputStream#log('TPS: ') insert into TempStream; from FooStream select * insert into kafkaStream; Results¶ Memory consumption: 1.7g TPS: 38,000